package com.atguigu.tingshu.search.service.impl;

import co.elastic.clients.elasticsearch.ElasticsearchClient;
import co.elastic.clients.elasticsearch._types.FieldValue;
import co.elastic.clients.elasticsearch._types.SortOrder;
import co.elastic.clients.elasticsearch._types.aggregations.Aggregate;
import co.elastic.clients.elasticsearch._types.query_dsl.BoolQuery;
import co.elastic.clients.elasticsearch.core.SearchRequest;
import co.elastic.clients.elasticsearch.core.SearchResponse;
import co.elastic.clients.elasticsearch.core.search.HitsMetadata;
import co.elastic.clients.elasticsearch.core.search.Suggestion;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tingshu.album.client.AlbumClientFeign;
import com.atguigu.tingshu.common.constant.SystemConstant;
import com.atguigu.tingshu.model.album.AlbumInfo;
import com.atguigu.tingshu.model.album.BaseCategory3;
import com.atguigu.tingshu.model.album.BaseCategoryView;
import com.atguigu.tingshu.model.search.AlbumInfoIndex;
import com.atguigu.tingshu.model.search.SuggestIndex;
import com.atguigu.tingshu.model.user.UserInfo;
import com.atguigu.tingshu.query.search.AlbumIndexQuery;
import com.atguigu.tingshu.search.dao.AlbumInfoDao;
import com.atguigu.tingshu.search.service.SearchService;
import com.atguigu.tingshu.user.client.UserClientFeign;
import com.atguigu.tingshu.vo.album.AlbumStatVo;
import com.atguigu.tingshu.vo.search.AlbumInfoIndexVo;
import com.atguigu.tingshu.vo.search.AlbumSearchResponseVo;
import com.atguigu.tingshu.vo.user.UserInfoVo;
import lombok.AllArgsConstructor;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.stereotype.Service;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
import java.util.stream.Stream;

@Slf4j
@Service
@AllArgsConstructor
@SuppressWarnings({"unchecked", "rawtypes"})
public class SearchServiceImpl implements SearchService {

    private AlbumInfoDao albumInfoDao;

    private UserClientFeign userClientFeign;

    private AlbumClientFeign albumClientFeign;

    private ElasticsearchClient elasticsearchClient;

    private ThreadPoolExecutor threadPoolExecutor;


    /**
     * 获取首页一级分类下 热门TOP7三级分类的 热门Top6的专辑数据
     *
     * @param category1Id 一级分类ID
     * @return 专辑数据
     */
    @SneakyThrows
    @Override
    public Object channel(Long category1Id) {
        // 查询首页一级分类的热门Top7三级分类列表
        List<BaseCategory3> baseCategory3List = albumClientFeign.findTopBaseCategory3(category1Id);

        // list转map key:id  value:对象本身 方便后续查询操作
        Map<Long, BaseCategory3> baseCategory3Map =
                baseCategory3List.stream().collect(Collectors.toMap(
                        key -> key.getId(),
                        value -> value
                ));

        // 获取Top7三级分类的id 转换类型用于后续查询
        List<FieldValue> category3IdValueList = baseCategory3List.stream().map(baseCategory3 -> {
            // 初始化
            return FieldValue.of(baseCategory3.getId());
        }).collect(Collectors.toList());

        // 查询条件对象初始化
        SearchRequest.Builder builder = new SearchRequest.Builder();
        // 指定查询的索引
        builder.index("albuminfo");
        // 构建查询条件:匹配category3Id的值是否在category3IdValueList中
        builder.query(
                query -> query.terms(
                        terms -> terms.field("category3Id").terms(
                                value -> value.value(category3IdValueList)
                        )
                )
        );

        // 构建分桶语句:根据三级分类分桶 并将每个桶的数据根据热度降序排序 取前6条
        builder.aggregations(
                "aggCategory3Id", // 别名
                agg -> agg.terms(
                        terms -> terms.field("category3Id")
                ).aggregations(
                        "topHitsInfo",
                        subAgg -> subAgg.topHits(
                                topHits ->
                                        topHits.sort(
                                                sort -> sort.field(
                                                        field -> field.field("hotScore").order(SortOrder.Desc)// 根据hotscore排序,降序
                                                )
                                        ).size(6)// 取前6条
                        )
                )
        );

        // 执行查询:去es中获取Top7的三级分类下全部的专辑信息
        SearchResponse<AlbumInfoIndex> searchResponse = elasticsearchClient.search(builder.build(), AlbumInfoIndex.class);

        // 聚合结果
        Map<String, Aggregate> aggregations = searchResponse.aggregations();
        if (aggregations != null) {
            // 获取三级分类的全部的桶的聚合结果
            Aggregate aggregate = aggregations.get("aggCategory3Id");

            return aggregate.lterms().buckets().array().stream().map(longTermsBucket -> {

                JSONObject jsonObject = new JSONObject();
                // 获取这个对象的值
                long category3Id = longTermsBucket.key();
                // 保存三级分类
                jsonObject.put("baseCategory3", baseCategory3Map.get(category3Id));
                // 获取子聚合
                Map<String, Aggregate> subAggregations = longTermsBucket.aggregations();

                if (subAggregations != null) {
                    // 根据别名获取结果
                    Aggregate topHitsInfoAggregate = subAggregations.get("topHitsInfo");
                    // 获取桶里的文档数据
                    List<AlbumInfoIndex> albumInfoIndexList =
                            topHitsInfoAggregate.topHits().hits().hits().stream().map(album -> {
                                return album.source().to(AlbumInfoIndex.class);
                            }).collect(Collectors.toList());
                    // 存储Top6热门专辑数据
                    jsonObject.put("list", albumInfoIndexList);
                }
                // 返回
                return jsonObject;

            }).collect(Collectors.toList());
        }
        // 返回
        return null;
    }

    /**
     * 搜索专辑
     *
     * @param albumIndexQuery 专辑搜索vo实体
     * @return 专辑数据
     */
    @SneakyThrows
    @Override
    public Object search(AlbumIndexQuery albumIndexQuery) {
        // 构建搜索请求对象
        SearchRequest.Builder builder = new SearchRequest.Builder();
        // 设置索引
        builder.index("albuminfo");
        // 构建搜索条件
        builder = buildSearchParams(builder, albumIndexQuery);
        // 执行查询
        SearchResponse<AlbumInfoIndex> searchResponse = elasticsearchClient.search(builder.build(), AlbumInfoIndex.class);
        // 解析结果
        AlbumSearchResponseVo vo = getSearchResult(searchResponse);
        // 补全
        vo.setPageNo(albumIndexQuery.getPageNo());
        Integer pageSize = albumIndexQuery.getPageSize();
        vo.setPageSize(pageSize);
        // 计算总页码
        Long total = vo.getTotal();
        vo.setTotalPages(total % pageSize == 0 ? total / pageSize : total / pageSize + 1);
        // 返回结果
        return vo;
    }

    /**
     * 提示词补全
     *
     * @param keywords 关键字
     * @return 提示词列表
     */
    @SneakyThrows
    @Override
    public Object completeSuggest(String keywords) {
        // 查询条件对象初始化
        SearchRequest.Builder builder = new SearchRequest.Builder();
        // 指定查询的索引
        builder.index("suggestinfo");
        // 构建查询条件:匹配关键字
        builder.suggest(suggest -> suggest
                .suggesters("suggestKeyword",
                        s -> s.prefix(keywords)
                                .completion(c -> c
                                        .field("keyword")       // 匹配的域
                                        .size(10)               // 返回的数量
                                        .skipDuplicates(true)   // 去重
                                        .fuzzy(f -> f.fuzziness("auto"))  // 模糊匹配
                                )
                ).suggesters("suggestPinyin",
                        s -> s.prefix(keywords)
                                .completion(c -> c
                                        .field("keywordPinyin")
                                        .size(10)
                                        .skipDuplicates(true)
                                        .fuzzy(f -> f.fuzziness("auto"))
                                )
                ).suggesters("suggestSequence",
                        s -> s.prefix(keywords)
                                .completion(c -> c
                                        .field("keywordSequence")
                                        .size(10)
                                        .skipDuplicates(true)
                                        .fuzzy(f -> f.fuzziness("auto"))
                                )
                )
        );
        // 执行搜索:去es中获取提示词列表
        SearchResponse<SuggestIndex> searchResponse = elasticsearchClient.search(builder.build(), SuggestIndex.class);
        // 获取提示词命中的数据
        Map<String, List<Suggestion<SuggestIndex>>> suggestMap = searchResponse.suggest();
        if (suggestMap != null) {
            // 解析汉字的结果
            List<String> list1 = getSuggestResult(suggestMap, "suggestKeyword");
            // 解析拼音的结果
            List<String> list2 = getSuggestResult(suggestMap, "suggestPinyin");
            // 解析拼音首字母的结果
            List<String> list3 = getSuggestResult(suggestMap, "suggestSequence");
            // 合并三个结果
            return Stream.of(list1, list2, list3).flatMap(Collection::stream)
                    .collect(Collectors.toSet())// 去重
                    .stream()
                    .limit(10) // 取10条
                    .collect(Collectors.toList());
        }
        return null;
    }

    /**
     * 查询专辑的详情信息
     *
     * @param albumId 专辑ID
     * @return 专辑详情信息
     */
    @Override
    public Object getAlbumInfo(Long albumId) {

        Map<String, Object> result = new ConcurrentHashMap<>();

        // 判断布隆过滤器中是否包含专辑信息
        if (albumClientFeign.getAlbumFromBloom(albumId)) {

            // 任务1：根据专辑id 获取专辑信息
            CompletableFuture<AlbumInfo> future1 = CompletableFuture.supplyAsync(() -> {
                AlbumInfo albumInfo = albumClientFeign.getAlbumInfo(albumId);
                if (albumInfo == null && albumInfo.getId() == null) {
                    return null;
                }
                result.put("albumInfo", albumInfo);
                return albumInfo;
            }, threadPoolExecutor);

            // 任务2：分类的数据
            CompletableFuture<Void> future2 = future1.thenAcceptAsync(albumInfo -> {
                if (albumInfo == null) {
                    return;
                }
                BaseCategoryView baseCategoryView = albumClientFeign.getBaseCategoryView(albumInfo.getCategory3Id());
                result.put("baseCategoryView", baseCategoryView);
            }, threadPoolExecutor);

            // 任务3：统计的数据
            CompletableFuture<Void> future3 = future1.thenAcceptAsync(albumInfo -> {
                if (albumInfo == null) {
                    return;
                }
                Map<String, Object> albumStatMap = albumClientFeign.getAlbumStatMap(albumId);
                AlbumStatVo albumStatVo = new AlbumStatVo();
                albumStatVo.setAlbumId(albumId);
                albumStatVo.setPlayStatNum(Long.valueOf(albumStatMap.get(SystemConstant.ALBUM_STAT_PLAY).toString()));
                albumStatVo.setSubscribeStatNum(Long.valueOf(albumStatMap.get(SystemConstant.ALBUM_STAT_SUBSCRIBE).toString()));
                albumStatVo.setBuyStatNum(Long.valueOf(albumStatMap.get(SystemConstant.ALBUM_STAT_BROWSE).toString()));
                albumStatVo.setCommentStatNum(Long.valueOf(albumStatMap.get(SystemConstant.ALBUM_STAT_COMMENT).toString()));
                result.put("albumStatVo", albumStatVo);
            }, threadPoolExecutor);

            // 任务4：作者的数据
            CompletableFuture<Void> future4 = future1.thenAcceptAsync(albumInfo -> {
                if (albumInfo == null) {
                    return;
                }
                UserInfo userInfo = userClientFeign.getUserInfo(albumInfo.getUserId());
                UserInfoVo userInfoVo = new UserInfoVo();
                BeanUtils.copyProperties(userInfo, userInfoVo);
                result.put("announcer", userInfoVo);
            }, threadPoolExecutor);

            // 等待全部结束
            CompletableFuture.allOf(future2, future3).join();
        }
        // 返回
        return result;
    }

    /**
     * 解析提示词结果
     *
     * @param suggestMap 提示词结果
     * @param name       提示词名称
     * @return 提示词列表
     */
    private CopyOnWriteArrayList<String> getSuggestResult(Map<String, List<Suggestion<SuggestIndex>>> suggestMap, String name) {
        // 初始化
        CopyOnWriteArrayList<String> listResult = new CopyOnWriteArrayList<>();
        // 解析匹配的结果并记录
        suggestMap.get(name).get(0).completion().options().stream().forEach(c -> {
            // 记录一个结果
            listResult.add(c.source().getTitle());
        });
        // 返回
        return listResult;
    }

    /**
     * 构建搜索条件
     *
     * @param builder         搜索条件构造器
     * @param albumIndexQuery 专辑搜索vo实体
     * @return 查询条件
     */
    private SearchRequest.Builder buildSearchParams(SearchRequest.Builder builder, AlbumIndexQuery albumIndexQuery) {
        // 组合查询构造器
        BoolQuery.Builder boolQueryBuilder = new BoolQuery.Builder();

        // 获取关键字条件
        String keyword = albumIndexQuery.getKeyword();
        if (StringUtils.isNotEmpty(keyword)) {
            // 组合查询 专辑标题、简介、作者字段与关键字做匹配查询
            boolQueryBuilder
                    .should(should -> should.match(match -> match.field("albumTitle").query(keyword)))
                    .should(should -> should.match(match -> match.field("albumIntro").query(keyword)))
                    .should(should -> should.match(match -> match.field("announcerName").query(keyword)));
        }

        // 设置过滤条件: 一级分类
        Long category1Id = albumIndexQuery.getCategory1Id();
        if (category1Id != null) {
            boolQueryBuilder.filter(filter -> filter.term(
                    term -> term.field("category1Id").value(category1Id)
            ));
        }

        // 设置过滤条件: 二级分类
        Long category2Id = albumIndexQuery.getCategory2Id();
        if (category2Id != null) {
            boolQueryBuilder.filter(filter -> filter.term(
                    term -> term.field("category2Id").value(category2Id)
            ));
        }

        // 设置过滤条件: 三级分类
        Long category3Id = albumIndexQuery.getCategory3Id();
        if (category3Id != null) {
            boolQueryBuilder.filter(filter -> filter.term(
                    term -> term.field("category3Id").value(category3Id)
            ));
        }

        // 获取标签条件
        List<String> attributeList = albumIndexQuery.getAttributeList();
        if (attributeList != null && attributeList.size() > 0) {
            // 遍历拼接条件
            attributeList.stream().forEach(attribute -> {
                // 切分: 0-属性id must 1-属性值id
                String[] split = attribute.split(":");
                // 添加条件
                boolQueryBuilder.filter(
                        filter -> filter.nested(nested -> nested
                                .path("attributeValueIndexList")
                                .query(query -> query.bool(
                                        bool -> bool
                                                .must(m -> m.term(t -> t.field("attributeValueIndexList.attributeId").value(split[0])))// 名称id相等
                                                .must(m -> m.term(t -> t.field("attributeValueIndexList.valueId").value(split[1])))// 属性值id相等
                                ))
                        )
                );
            });
        }
        // 保存查询条件: where
        builder.query(boolQueryBuilder.build()._toQuery());

        // 设置分页参数
        Integer pageNo = albumIndexQuery.getPageNo();
        Integer pageSize = albumIndexQuery.getPageSize();
        builder.from((pageNo - 1) * pageSize);
        builder.size(pageSize);

        // 设置高亮显示
        builder.highlight(highlight -> highlight.fields(
                "albumTitle",
                h -> h.preTags("<font style=color:red>").postTags("</font>")
        ));

        // 设置搜索排序参数 根据用户选择的排序规则进行排序
        String order = albumIndexQuery.getOrder();
        if (StringUtils.isNotEmpty(order)) {
            // 0-域 1-规则
            String[] split = order.split(":");
            switch (split[0]) {
                // 综合
                case "1" -> builder.sort(s -> s.field(f -> f.field("id").order(SortOrder.Desc)));
                // 播放量
                case "2" -> builder.sort(s -> s.field(f -> f.field("playStatNum").order(SortOrder.Desc)));
                // 发布时间
                case "3" -> builder.sort(s -> s.field(f -> f.field("createTime").order(SortOrder.Desc)));
            }
        } else {
            builder.sort(s -> s.field(f -> f.field("hotScore").order(SortOrder.Desc)));
        }
        // 返回查询条件
        return builder;
    }

    /**
     * 解析查询结果
     *
     * @param searchResponse 响应对象
     * @return 响应结果vo
     */
    private AlbumSearchResponseVo getSearchResult(SearchResponse<AlbumInfoIndex> searchResponse) {
        // 响应结果对象初始化
        AlbumSearchResponseVo albumSearchResponseVo = new AlbumSearchResponseVo();
        // 获取命中的数据
        HitsMetadata<AlbumInfoIndex> hits = searchResponse.hits();
        // 获取总数据量
        long total = hits.total().value();
        albumSearchResponseVo.setTotal(total);

        // 解析数据
        List<AlbumInfoIndexVo> list = hits.hits().stream().map(hit -> {
            // 获取原始数据
            AlbumInfoIndex albumInfoIndex = hit.source();
            // 获取高亮数据
            Map<String, List<String>> highlight = hit.highlight();
            if (highlight != null) {
                List<String> highLightList = highlight.get("albumTitle");
                if (highLightList != null) {
                    String albumTitle =
                            highLightList.stream().collect(Collectors.joining(","));
                    albumInfoIndex.setAlbumTitle(albumTitle);
                }
            }
            // 返回结果初始化
            AlbumInfoIndexVo albumInfoIndexVo = new AlbumInfoIndexVo();
            BeanUtils.copyProperties(albumInfoIndex, albumInfoIndexVo);
            // 返回
            return albumInfoIndexVo;
        }).collect(Collectors.toList());
        // 保存
        albumSearchResponseVo.setList(list);
        // 返回
        return albumSearchResponseVo;
    }
}
